package com.alinesno.cloud.compoment.kafka.impl;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.alinesno.cloud.compoment.kafka.KafkaProducer;

import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;

/**
 * kafka消息发送实现
 * 
 * @author WeiXiaoJin
 * @since 2019年4月9日 上午11:13:24
 */
public class KafkaComponentServiceImpl implements KafkaProducer {

	private static final Logger log = LoggerFactory.getLogger(KafkaComponentServiceImpl.class);

	@Autowired
	private KafkaTemplate<String, String> template;
	private Snowflake snowflake = IdUtil.createSnowflake(1, 1);

	@Override
	public void asyncSendMessage(String topic, String data) {
		String key = snowflake.nextId() + "";
		this.asyncSendMessage(topic, key, data);
	}

	@Override
	public boolean blockSendMessage(String topic, String data) {
		String key = snowflake.nextId() + "";
		return this.blockSendMessage(topic, key, data) ; 
	}

	@Override
	public void asyncSendMessage(String topic, String key, String data) {
		log.debug("topic{} , data{}", topic, data);

		final ProducerRecord<String, String> record = createRecord(topic, key , data);

		ListenableFuture<SendResult<String, String>> future = template.send(record);
		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

			@Override
			public void onSuccess(SendResult<String, String> result) {
				handleSuccess(data);
			}

			@Override
			public void onFailure(Throwable ex) {
				handleFailure(data, record, ex);
			}

		});
	}

	@Override
	public boolean blockSendMessage(String topic, String key, String data) {
		final ProducerRecord<String, String> record = createRecord(topic, key ,data);

		try {
			template.send(record).get(10, TimeUnit.SECONDS);
			handleSuccess(data);
		} catch (ExecutionException e) {
			handleFailure(data, record, e.getCause());
		} catch (TimeoutException | InterruptedException e) {
			handleFailure(data, record, e);
		}
		return true ;
	}
	
	private ProducerRecord<String, String> createRecord(String topic, String key, String data) {
		return new ProducerRecord<String, String>(topic, key , data);
	}

	private void handleFailure(String data, ProducerRecord<String, String> record, Throwable cause) {
		log.error("消息record:{} ,发送失败:{}", record, cause);
	}

	private void handleSuccess(String data) {
		log.debug("消息发送成功:{}", data);
	}


}
